package org.overlord.bam.active.collection.epn;

import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.naming.InitialContext;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.mvel2.MVEL;
import org.overlord.bam.active.collection.ActiveCollectionSource;
import org.overlord.bam.epn.ContextualNodeListener;
import org.overlord.bam.epn.EPNManager;
import org.overlord.bam.epn.EventList;
import org.overlord.bam.epn.NotificationType;

/* loaded from: input_file:org/overlord/bam/active/collection/epn/EPNActiveCollectionSource.class */
public class EPNActiveCollectionSource extends ActiveCollectionSource {
    private static final Logger LOG = Logger.getLogger(EPNActiveCollectionSource.class.getName());
    private EPNManager _epnManager = null;
    private String _network = null;
    private String _node = null;
    private NotificationType _notifyType = null;
    private long _aggregationDuration = 0;
    private String _groupBy = null;
    private Serializable _groupByExpression = null;
    private String _aggregationScript = null;
    private Serializable _aggregationScriptExpression = null;
    private Map<Object, List<Object>> _groupedEvents = new HashMap();
    private Aggregator _aggregator = null;
    private ClassLoader _contextClassLoader = null;
    private boolean _preinitialized = false;
    private EPNACSNodeListener _listener = new EPNACSNodeListener();

    /* loaded from: input_file:org/overlord/bam/active/collection/epn/EPNActiveCollectionSource$Aggregator.class */
    public class Aggregator extends TimerTask {
        private Timer _timer = new Timer();

        public Aggregator() {
            this._timer.scheduleAtFixedRate(this, EPNActiveCollectionSource.this._aggregationDuration, EPNActiveCollectionSource.this._aggregationDuration);
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            EPNActiveCollectionSource.this.publishAggregateEvents();
        }
    }

    /* loaded from: input_file:org/overlord/bam/active/collection/epn/EPNActiveCollectionSource$EPNACSNodeListener.class */
    public class EPNACSNodeListener extends ContextualNodeListener {
        public EPNACSNodeListener() {
        }

        public ClassLoader getContextClassLoader() {
            return EPNActiveCollectionSource.this._contextClassLoader;
        }

        public void handleEvents(String str, String str2, String str3, NotificationType notificationType, EventList eventList) {
            if (EPNActiveCollectionSource.this.isRelevant(str, str2, str3, notificationType)) {
                if (EPNActiveCollectionSource.this._aggregationDuration <= 0 || EPNActiveCollectionSource.this._groupByExpression == null) {
                    EPNActiveCollectionSource.this.processNotification(str, str2, str3, notificationType, eventList);
                } else {
                    EPNActiveCollectionSource.this.aggregateEvents(str, str2, str3, notificationType, eventList);
                }
            }
        }
    }

    protected void setEPNManager(EPNManager ePNManager) {
        this._epnManager = ePNManager;
    }

    @JsonIgnore
    protected Map<Object, List<Object>> getGroupedEvents() {
        return this._groupedEvents;
    }

    public void setNetwork(String str) {
        this._network = str;
    }

    public String getNetwork() {
        return this._network;
    }

    public void setNode(String str) {
        this._node = str;
    }

    public String getNode() {
        return this._node;
    }

    public void setNotifyType(NotificationType notificationType) {
        this._notifyType = notificationType;
    }

    public NotificationType getNotifyType() {
        return this._notifyType;
    }

    public void setAggregationDuration(long j) {
        this._aggregationDuration = j;
    }

    public long getAggregationDuration() {
        return this._aggregationDuration;
    }

    public void setGroupBy(String str) {
        this._groupBy = str;
    }

    public String getGroupBy() {
        return this._groupBy;
    }

    public void setAggregationScript(String str) {
        this._aggregationScript = str;
    }

    public String getAggregationScript() {
        return this._aggregationScript;
    }

    public void init() throws Exception {
        super.init();
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Initializing EPN Active Collection Source");
        }
        if (this._epnManager == null) {
            try {
                this._epnManager = (EPNManager) new InitialContext().lookup("java:global/overlord-bam/EPNManager");
            } catch (Exception e) {
                LOG.log(Level.SEVERE, "Failed to obtain Event Processor Network Manager", (Throwable) e);
                throw e;
            }
        }
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Register node listener for network=" + this._network);
        }
        this._epnManager.addNodeListener(this._network, this._listener);
        if (this._groupBy != null) {
            this._groupByExpression = MVEL.compileExpression(this._groupBy);
            if (this._aggregationDuration > 0) {
                this._aggregator = new Aggregator();
            }
        }
        preInit();
    }

    protected void preInit() throws Exception {
        if (this._preinitialized) {
            return;
        }
        this._preinitialized = true;
        super.preInit();
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Pre-Initializing EPN Active Collection Source (script=" + this._aggregationScript + " compiled=" + this._aggregationScriptExpression + ")");
        }
        if (this._aggregationScript != null && this._aggregationScriptExpression == null) {
            InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(this._aggregationScript);
            if (resourceAsStream == null) {
                LOG.severe("Unable to locate '" + this._aggregationScript + "'");
            } else {
                byte[] bArr = new byte[resourceAsStream.available()];
                resourceAsStream.read(bArr);
                resourceAsStream.close();
                this._aggregationScriptExpression = MVEL.compileExpression(new String(bArr));
            }
        }
        this._contextClassLoader = Thread.currentThread().getContextClassLoader();
    }

    public void close() throws Exception {
        super.close();
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Closing EPN Active Collection Source");
        }
        this._epnManager.removeNodeListener(this._network, this._listener);
        if (this._aggregator != null) {
            this._aggregator.cancel();
        }
    }

    protected boolean isRelevant(String str, String str2, String str3, NotificationType notificationType) {
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.finest("isRelevant network=" + str + " version=" + str2 + " node=" + str3 + " type=" + notificationType + "?");
        }
        if (this._network != null && !str.equals(this._network)) {
            return false;
        }
        if (this._node != null && !str3.equals(this._node)) {
            return false;
        }
        if (this._notifyType != null && !notificationType.equals(this._notifyType)) {
            return false;
        }
        if (!LOG.isLoggable(Level.FINEST)) {
            return true;
        }
        LOG.finest("isRelevant network=" + str + " version=" + str2 + " node=" + str3 + " type=" + notificationType + " TRUE");
        return true;
    }

    protected void processNotification(String str, String str2, String str3, NotificationType notificationType, EventList eventList) {
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.finest("processNotification network=" + str + " version=" + str2 + " node=" + str3 + " type=" + notificationType + " events=" + eventList);
        }
        Iterator it = eventList.iterator();
        while (it.hasNext()) {
            insert(null, (Serializable) it.next());
        }
    }

    protected void aggregateEvents(String str, String str2, String str3, NotificationType notificationType, EventList eventList) {
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.finest("aggregateEvents network=" + str + " version=" + str2 + " node=" + str3 + " type=" + notificationType + " events=" + eventList);
        }
        synchronized (this._groupedEvents) {
            Iterator it = eventList.iterator();
            while (it.hasNext()) {
                Serializable serializable = (Serializable) it.next();
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("Aggregating event: " + serializable);
                }
                Object executeExpression = MVEL.executeExpression(this._groupByExpression, serializable);
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("Derived key '" + executeExpression + "' for event: " + serializable);
                }
                if (executeExpression == null) {
                    LOG.severe("Failed to evaluate expression '" + this._groupBy + "' on event: " + serializable);
                } else {
                    List<Object> list = this._groupedEvents.get(executeExpression);
                    if (list == null) {
                        list = new ArrayList();
                        this._groupedEvents.put(executeExpression, list);
                    }
                    list.add(serializable);
                }
            }
        }
    }

    protected void publishAggregateEvents() {
        HashMap hashMap = null;
        synchronized (this._groupedEvents) {
            if (this._groupedEvents.size() > 0) {
                hashMap = new HashMap(this._groupedEvents);
                this._groupedEvents.clear();
            }
        }
        if (hashMap != null) {
            if (this._aggregationScriptExpression == null) {
                LOG.severe("No aggregation script to process events: " + hashMap);
                return;
            }
            HashMap hashMap2 = new HashMap();
            for (List list : hashMap.values()) {
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("publishAggregateEvents list=" + list);
                }
                hashMap2.clear();
                hashMap2.put("events", list);
                Object executeExpression = MVEL.executeExpression(this._aggregationScriptExpression, hashMap2);
                if (executeExpression == null) {
                    LOG.severe("Aggregation script failed to return a result (network=" + this._network + " node=" + this._node + ")");
                    if (LOG.isLoggable(Level.FINEST)) {
                        LOG.finest("Script=" + this._aggregationScript);
                        LOG.finest("List of Events=" + list);
                    }
                } else {
                    if (LOG.isLoggable(Level.FINEST)) {
                        LOG.finest("publishAggregateEvents result=" + executeExpression);
                    }
                    insert(null, executeExpression);
                }
            }
        }
    }
}
